Execution Pools

Thread pool

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to demonstrate parallel test execution in a thread pool.
"""

import sys

from testplan import test_plan
from testplan import Task
from testplan.parser import TestplanParser
from testplan.runners.pools.base import Pool as ThreadPool
from testplan.report.testing.styles import Style, StyleEnum


OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


class CustomParser(TestplanParser):
    """Inheriting base parser."""

    def add_arguments(self, parser):
        """Defining custom arguments for this Testplan."""
        parser.add_argument(
            "--tasks-num",
            action="store",
            type=int,
            default=8,
            help="Number of tests to be scheduled.",
        )
        parser.add_argument(
            "--pool-size",
            action="store",
            type=int,
            default=4,
            help="How many thread workers assigned to pool.",
        )


# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with thread pool test execution.

# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="ThreadPoolExecution",
    parser=CustomParser,
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    # Add a thread pool test execution resource to the plan of given size.
    pool = ThreadPool(name="MyPool", size=plan.args.pool_size)
    plan.add_resource(pool)

    # Add a given number of similar tests to the thread pool
    # to be executed in parallel.
    for idx in range(plan.args.tasks_num):
        task = Task(
            target="make_multitest", module="tasks", kwargs={"index": idx}
        )
        plan.schedule(task, resource="MyPool")


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

tasks.py

"""TCP connections tests to be executed in parallel in a thread pool."""

import threading
import time

from testplan.testing.multitest import MultiTest, testsuite, testcase

from testplan.common.utils.context import context
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient


def after_start(env):
    """
    Called right after MultiTest starts.
    """
    # Server accepts connection request made by client.
    env.server.accept_connection()


@testsuite
class TCPTestsuite:
    """TCP communication tests."""

    def __init__(self):
        self._thread_id = threading.current_thread().name

    @testcase
    def send_and_receive_msg(self, env, result):
        """
        Server client communication with a sleep in the middle that
        represents processing time by the server before respond.
        """
        # Client sends a message.
        msg = env.client.cfg.name
        result.log(
            "Client on thread {} is sending: {}".format(self._thread_id, msg)
        )
        bytes_sent = env.client.send_text(msg)
        received = env.server.receive_text(size=bytes_sent)
        result.equal(received, msg, "Server received")

        start_time = time.time()
        # Sleeping here to represent a time consuming processing
        # of the message received by the server before replying back.
        time.sleep(1)
        result.log(
            "Server was processing message for {}s".format(
                round(time.time() - start_time, 1)
            )
        )
        response = "Hello {}".format(received)

        result.log(
            "Server on thread {} is responding: {}".format(
                self._thread_id, response
            )
        )
        # Server sends the reply.
        bytes_sent = env.server.send_text(response)
        received = env.client.receive_text(size=bytes_sent)
        result.equal(received, response, "Client received")


def make_multitest(index=0):
    """
    Creates a new MultiTest that runs TCP connection tests.
    This will be created inside a thread worker.
    """
    print(
        "Creating a MultiTest on {}.".format(threading.current_thread().name)
    )
    test = MultiTest(
        name="TCPMultiTest_{}".format(index),
        suites=[TCPTestsuite()],
        environment=[
            TCPServer(name="server"),
            TCPClient(
                name="client",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
            ),
        ],
        after_start=after_start,
    )
    return test

Process pool

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to demonstrate parallel test execution in a process pool.
"""

import sys

from testplan import test_plan
from testplan import Task
from testplan.runners.pools.process import ProcessPool

from testplan.parser import TestplanParser
from testplan.report.testing.styles import Style, StyleEnum

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


class CustomParser(TestplanParser):
    """Inheriting base parser."""

    def add_arguments(self, parser):
        """Defining custom arguments for this Testplan."""
        parser.add_argument("--tasks-num", action="store", type=int, default=8)
        parser.add_argument("--pool-size", action="store", type=int, default=4)


# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with process pool test execution.

# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="ProcessPoolExecution",
    parser=CustomParser,
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    # Add a process pool test execution resource to the plan of given size.
    pool = ProcessPool(name="MyPool", size=plan.args.pool_size)
    plan.add_resource(pool)

    # Add a given number of similar tests to the process pool
    # to be executed in parallel.
    for idx in range(plan.args.tasks_num):
        # All Task arguments need to be serializable.
        task = Task(
            target="make_multitest", module="tasks", kwargs={"index": idx}
        )
        plan.schedule(task, resource="MyPool")


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

tasks.py

"""TCP connections tests to be executed in parallel in a process pool."""

import os
import time

from testplan.testing.multitest import MultiTest, testsuite, testcase

from testplan.common.utils.context import context
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient


def after_start(env):
    """
    Called right after MultiTest starts.
    """
    # Server accepts connection request made by client.
    env.server.accept_connection()


@testsuite
class TCPTestsuite:
    """TCP communication tests."""

    def __init__(self):
        self._process_id = os.getpid()

    @testcase
    def send_and_receive_msg(self, env, result):
        """
        Server client communication with a sleep in the middle that
        represents processing time by the server before respond.
        """
        # Client sends a message.
        msg = env.client.cfg.name
        result.log(
            "Client with process id {} is sending: {}".format(
                self._process_id, msg
            )
        )
        bytes_sent = env.client.send_text(msg)
        received = env.server.receive_text(size=bytes_sent)
        result.equal(received, msg, "Server received")

        start_time = time.time()
        # Sleeping here to represent a time consuming processing
        # of the message received by the server before replying back.
        time.sleep(1)
        result.log(
            "Server was processing message for {}s".format(
                round(time.time() - start_time, 1)
            )
        )
        response = "Hello {}".format(received)

        result.log(
            "Server with process id {} is responding: {}".format(
                self._process_id, response
            )
        )
        # Server sends the reply.
        bytes_sent = env.server.send_text(response)
        received = env.client.receive_text(size=bytes_sent)
        result.equal(received, response, "Client received")


def make_multitest(index=0):
    """
    Creates a new MultiTest that runs TCP connection tests.
    This will be created inside a process worker.
    """
    test = MultiTest(
        name="TCPMultiTest_{}".format(index),
        suites=[TCPTestsuite()],
        environment=[
            TCPServer(name="server"),
            TCPClient(
                name="client",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
            ),
        ],
        after_start=after_start,
    )
    return test

Remote pool

Required files:

test_plan.py

#!/usr/bin/env python
# This plan contains tests that demonstrate failures as well.
"""
Parallel test execution in a remote pool.
"""

import os
import sys
import getpass
import shutil
import tempfile

# Check if the remote host has been specified in the environment. Remote
# hosts can only be Linux systems.
REMOTE_HOST = os.environ.get("TESTPLAN_REMOTE_HOST")
if not REMOTE_HOST:
    raise RuntimeError(
        "You must specify a remote Linux host via the TESTPLAN_REMOTE_HOST "
        "environment var to run this example."
    )

from testplan import test_plan
from testplan import Task
from testplan.runners.pools.remote import RemotePool

from testplan.common.utils.path import pwd

from testplan.parser import TestplanParser
from testplan.report.testing.styles import Style, StyleEnum

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)
TEMP_DIR = None


class CustomParser(TestplanParser):
    """Inheriting base parser."""

    def add_arguments(self, parser):
        """Defining custom arguments for this Testplan."""
        parser.add_argument("--tasks-num", action="store", type=int, default=8)
        parser.add_argument("--pool-size", action="store", type=int, default=4)


# Function that creates a file with some content
# to demonstrate custom file transferring.
def make_file(filename, dirname, content):
    path = os.path.join(dirname, filename)
    with open(path, "w") as fobj:
        fobj.write(content)
    return path


# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with remote pool test execution.

# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="RemotePoolExecution",
    parser=CustomParser,
    pdf_path=os.path.join(pwd(), "report.pdf"),
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """

    workspace = os.path.dirname(__file__)

    # Create two temporary files locally. For demonstration, just write the
    # filename as the content of each.
    assert TEMP_DIR is not None
    for filename in ("file1", "file2"):
        make_file(filename, TEMP_DIR, content=filename)

    # Explicitly specify the full paths to both the local source files just
    # created and the destination filepaths on the remote host.
    push_files = [
        (os.path.join(TEMP_DIR, "file1"), "/tmp/remote_example/file1"),
        (os.path.join(TEMP_DIR, "file2"), "/tmp/remote_example/file2"),
    ]

    # Add a remote pool test execution resource to the plan of given size.
    pool = RemotePool(
        name="MyPool",
        # Create 3 workers on the same remote host.
        hosts={REMOTE_HOST: 3},
        # Allow the remote port to be overridden by the
        # environment. Default to 0, which will make testplan use
        # the default SSH port for connections.
        port=int(os.environ.get("TESTPLAN_REMOTE_PORT", 0)),
        setup_script=["/bin/bash", "setup_script.ksh"],
        env={"LOCAL_USER": getpass.getuser(), "LOCAL_WORKSPACE": workspace},
        workspace_exclude=[".git/", ".cache/", "doc/", "test/"],
        # We push local files to the remote worker using the
        # explicit source and destination locations defined above.
        push=push_files,
        workspace=workspace,
        clean_remote=True,
    )

    plan.add_resource(pool)

    # Add a given number of similar tests to the remote pool
    # to be executed in parallel.
    for idx in range(plan.args.tasks_num):
        # All Task arguments need to be serializable.
        task = Task(
            target="make_multitest",
            module="tasks",
            # We specify the full paths to files as they will be found
            # on the remote host.
            kwargs={
                "index": idx,
                "files": [
                    "/tmp/remote_example/file1",
                    "/tmp/remote_example/file2",
                ],
            },
        )
        plan.schedule(task, resource="MyPool")


if __name__ == "__main__":
    # Create a new temporary directory for this test plan.
    TEMP_DIR = tempfile.mkdtemp()

    # Run the test plan.
    res = main()

    # Clean up all the temporary files used by this test plan.
    shutil.rmtree(TEMP_DIR)

    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

tasks.py

"""TCP connections tests to be executed in parallel in a remote pool."""

import os
import time

from testplan.testing.multitest import MultiTest, testsuite, testcase

from testplan.common.utils.context import context
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient


def after_start(env):
    """
    Called right after MultiTest starts.
    """
    # Server accepts connection request made by client.
    env.server.accept_connection()


@testsuite
class TCPTestsuite:
    """TCP communication tests."""

    def __init__(self, files):
        self._process_id = os.getpid()
        self._files = files

    def setup(self, env, result):
        result.log("LOCAL_USER: {}".format(os.environ["LOCAL_USER"]))

        for _file in self._files:
            with open(_file) as fobj:
                result.log("Source file contents: {}".format(fobj.read()))

    @testcase
    def send_and_receive_msg(self, env, result):
        """
        Server client communication with a sleep in the middle that
        represents processing time by the server before respond.
        """
        # Client sends a message.
        msg = env.client.cfg.name
        result.log(
            "Client with process id {} is sending: {}".format(
                self._process_id, msg
            )
        )
        bytes_sent = env.client.send_text(msg)
        received = env.server.receive_text(size=bytes_sent)
        result.equal(received, msg, "Server received")

        start_time = time.time()
        # Sleeping here to represent a time consuming processing
        # of the message received by the server before replying back.
        time.sleep(1)
        result.log(
            "Server was processing message for {}s".format(
                round(time.time() - start_time, 1)
            )
        )
        response = "Hello {}".format(received)

        result.log(
            "Server with process id {} is responding: {}".format(
                self._process_id, response
            )
        )
        # Server sends the reply.
        bytes_sent = env.server.send_text(response)
        received = env.client.receive_text(size=bytes_sent)
        result.equal(received, response, "Client received")


def make_multitest(index=0, files=None):
    """
    Creates a new MultiTest that runs TCP connection tests.
    This will be created inside a remote worker.
    """
    test = MultiTest(
        name="TCPMultiTest_{}".format(index),
        suites=[TCPTestsuite(files)],
        environment=[
            TCPServer(name="server"),
            TCPClient(
                name="client",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
            ),
        ],
        after_start=after_start,
    )
    return test

setup_script.ksh

#!/bin/bash
echo 'Executing setup commands.'

echo 'Environment:'
env

echo 'User:'
echo $USER

echo 'Hostname:'
hostname

# Make a soft link in remote host to make the local workspace
# absolute path available for hardcoded entries.
if [ ! -d $TESTPLAN_LOCAL_WORKSPACE ];
then
    mkdir -p `dirname $TESTPLAN_LOCAL_WORKSPACE`
    ln -s $TESTPLAN_REMOTE_WORKSPACE $TESTPLAN_LOCAL_WORKSPACE
else
    echo 'Local workspace is visible from the remote!'
fi

echo 'Finished setup commands.'
exit 0

Task Rerun

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to demonstrate task level rerun feature in a pool.
"""

import os
import sys
import uuid
import getpass
import tempfile

from testplan import test_plan
from testplan import Task
from testplan.runners.pools.base import Pool as ThreadPool

from testplan.parser import TestplanParser
from testplan.report.testing.styles import Style, StyleEnum

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


class CustomParser(TestplanParser):
    """Inheriting base parser."""

    def add_arguments(self, parser):
        """Defining custom arguments for this Testplan."""
        parser.add_argument("--pool-size", action="store", type=int, default=4)


# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with process pool test execution.

# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="PoolExecutionAndTaskRerun",
    parser=CustomParser,
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    # Add a thread pool test execution resource to the plan of given size.
    # Can also use a process pool instead.
    pool = ThreadPool(name="MyPool", size=plan.args.pool_size)
    plan.add_resource(pool)

    # Add a task with `rerun` argument to the thread pool
    tmp_file = os.path.join(
        tempfile.gettempdir(), getpass.getuser(), "{}.tmp".format(uuid.uuid4())
    )
    task = Task(
        target="make_multitest", module="tasks", args=(tmp_file,), rerun=2
    )
    plan.schedule(task, resource="MyPool")


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

tasks.py

"""An unstable tests to be executed until pass."""

import os

from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.common.utils.path import makedirs


@testsuite
class Unstablesuite:
    """
    A test suite which has an unstable testcase.
    The multitest containing this suite has to re-run twice
    (3 times in total) in order to get passed.
    """

    def __init__(self, tmp_file):
        self._iteration = None
        self._max_rerun = 2
        self._tmp_file = tmp_file

    def setup(self, env, result):
        """
        Create a tmp text file which record how many times the suite
        has been executed, should remove it after the last rerun.
        """
        makedirs(os.path.dirname(self._tmp_file))
        if not os.path.exists(self._tmp_file):
            self._iteration = 0
        else:
            with open(self._tmp_file, "r") as fp:
                self._iteration = int(fp.read())

        if self._iteration == self._max_rerun:
            os.remove(self._tmp_file)
        else:
            with open(self._tmp_file, "w") as fp:
                fp.write(str(self._iteration + 1))

        result.log("Suite setup in iteration {}".format(self._iteration))

    @testcase
    def unstable_testcase(self, env, result):
        """
        An unstable testcase which can only pass at 3rd run (2nd rerun).
        """
        if self._iteration == 2:
            result.log("Test passes")
        else:
            result.fail("Test fails")


def make_multitest(tmp_file):
    """
    Creates a new MultiTest that runs unstable tests.
    """
    return MultiTest(
        name="UnstableMultiTest",
        suites=[Unstablesuite(tmp_file=tmp_file)],
        environment=[],
    )

MultiTest parts scheduling

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to demonstrate parallel test execution with parts in a pool.
"""

import sys

from testplan import test_plan
from testplan import Task
from testplan.parser import TestplanParser
from testplan.runners.pools.base import Pool as ThreadPool
from testplan.report.testing.styles import Style, StyleEnum

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


class CustomParser(TestplanParser):
    """Inheriting base parser."""

    def add_arguments(self, parser):
        """Defining custom arguments for this Testplan."""
        parser.add_argument(
            "--parts-num",
            action="store",
            type=int,
            default=3,
            help="Number of parts to be split.",
        )
        parser.add_argument(
            "--pool-size",
            action="store",
            type=int,
            default=3,
            help="How many thread workers assigned to pool.",
        )


# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with thread pool test execution.

# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="MultiTestPartsExecution",
    parser=CustomParser,
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
    merge_scheduled_parts=False,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    # Add a thread pool test execution resource to the plan of given size.
    # Also you can use process pool or remote pool instead.
    pool = ThreadPool(name="MyPool", size=plan.args.pool_size)
    plan.add_resource(pool)

    # Add a given number of similar tests to the thread pool
    # to be executed in parallel.
    for idx in range(plan.args.parts_num):
        task = Task(
            target="make_multitest",
            module="tasks",
            kwargs={"part_tuple": (idx, plan.args.parts_num)},
        )
        plan.schedule(task, resource="MyPool")


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

tasks.py

"""Tests to be executed in multi-parts and task will be schedule in a pool."""

from testplan.testing.multitest import MultiTest, testsuite, testcase


@testsuite
class Suite1:
    """A test suite with several normal testcases."""

    @testcase
    def test_equal(self, env, result):
        result.equal("foo", "foo", description="Equality example")

    @testcase
    def test_not_equal(self, env, result):
        result.not_equal("foo", "bar", description="Inequality example")

    @testcase
    def test_less(self, env, result):
        result.less(2, 12, description="Less comparison example")

    @testcase
    def test_greater(self, env, result):
        result.greater(10, 5, description="Greater comparison example")

    @testcase
    def test_approximate_equal(self, env, result):
        result.isclose(
            100,
            101,
            rel_tol=0.01,
            abs_tol=0.0,
            description="Approximate equality example",
        )


@testsuite
class Suite2:
    """A test suite with parameterized testcases."""

    @testcase(parameters=tuple(range(6)))
    def test_bool(self, env, result, val):
        if val > 0:
            result.true(val, description="Check if value is true")
        else:
            result.false(val, description="Check if value is false")


def make_multitest(part_tuple=None):
    test = MultiTest(
        name="MultitestParts", suites=[Suite1(), Suite2()], part=part_tuple
    )
    return test

Task discover

This example requires a file structure demonstrated as below. In this example, @task_target annotated functions defined under sub-projects will be discovered when plan.schedule_all is called, without requiring user to specify target/module/path separately for each task.

|~sub_proj1/
| |-__init__.py
| |-suites.py
| `-tasks.py
|~sub_proj2/
| |-__init__.py
| |-suites.py
| `-tasks.py
`-test_plan.py*
Required files:

test_plan.py

#!/usr/bin/env python
"""
Discover and schedule tasks that spread across the project for parallel execution in Pool.
"""

import sys

from testplan import test_plan
from testplan.runners.pools.process import ProcessPool
from testplan.report.testing.styles import Style, StyleEnum


OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
@test_plan(
    name="TaskDiscovery",
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
    merge_scheduled_parts=True,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    # Add a process pool test execution resource to the plan of given size.
    # Also you can use thread pool or remote pool instead.
    pool = ProcessPool(name="MyPool")
    plan.add_resource(pool)

    # Create task objects from all @task_target we could find in the modules
    # that matches the name pattern under the specified path, and schedule them
    # to MyPool.

    plan.schedule_all(
        path=".", name_pattern=r".*tasks\.py$", resource="MyPool"
    )


if __name__ == "__main__":
    res = main()
    assert len(res.report.entries) == 5
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

sub_proj1/tasks.py

"""A discoverable task target that will be instanciated to multiple task objects"""

from testplan.runners.pools.tasks.base import task_target
from testplan.testing.multitest import MultiTest

# need to use full path specification of the module
import sub_proj1.suites

# each entry in parameters will be used to create one task object
@task_target(
    parameters=(
        # positional args to be passed to target, as a tuple or list
        ("Proj1-Suite2", None, [sub_proj1.suites.Suite2]),
        # keyword args to be passed to target, as a dict
        dict(
            name="Proj1-Suite1",
            part_tuple=(0, 2),
            suites=[sub_proj1.suites.Suite1],
        ),
        dict(
            name="Proj1-Suite1",
            part_tuple=(1, 2),
            suites=[sub_proj1.suites.Suite1],
        ),
    ),
    # additional args of Task class
    rerun=1,
    weight=1,
)
def make_multitest(name, part_tuple=None, suites=None):
    # a test target shall only return 1 runnable object
    test = MultiTest(
        name=name, suites=[cls() for cls in suites], part=part_tuple
    )
    return test


# an alternative way of specifying parts for multitest
@task_target(
    parameters=(
        dict(
            name="Proj1-Suite1-Again",
            suites=[sub_proj1.suites.Suite1],
        ),
    ),
    # instruct testplan to split each multitest task into parts
    multitest_parts=2,
    # additional args of Task class
    rerun=1,
    weight=1,
)
def make_multitest1(name, suites=None):
    # a test target shall only return 1 runnable object
    test = MultiTest(name=name, suites=[cls() for cls in suites])
    return test

sub_proj2/tasks.py

"""Discoverable task targets that will be instantiated to task objects"""
from testplan.runners.pools.tasks.base import task_target
from testplan.testing.multitest import MultiTest
from sub_proj2.suites import Suite1, Suite2


@task_target
def make_multitest1():
    # a test target shall only return 1 runnable object
    test = MultiTest(name="Proj2-Suite1", suites=[Suite1()])
    return test


@task_target
def make_multitest2():
    # a test target shall only return 1 runnable object
    test = MultiTest(name="Proj2-Suite2", suites=[Suite2()])
    return test

Auto-Part

This example requires command line argument --runtime-data runtime_data.txt.

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to demonstrate auto-part feature in a process pool.
"""

import sys
from testplan import test_plan
from testplan.runners.pools.process import ProcessPool
from testplan.report.testing.styles import Style, StyleEnum

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


# The auto_part_runtime_limit argument instructs testplan to split parts="auto"
# Multitest into optimal number of parts so that the runtime of each part
# is not more than the limit.
@test_plan(
    name="AutoPartExample",
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
    auto_part_runtime_limit=100,
    plan_runtime_target=100,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    # Enable smart-schedule pool size
    pool = ProcessPool(name="MyPool", size="auto")

    # Add a process pool test execution resource to the plan of given size.
    plan.add_resource(pool)

    # Discover tasks and calculate the right size of the pool based on the weight (runtime) of the
    # tasks so that runtime of all tasks meets the plan_runtime_target.
    plan.schedule_all(
        path=".",
        name_pattern=r".*task\.py$",
        resource="MyPool",
    )


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

runtime_data.txt

{
  "Proj1-suite": {
    "execution_time": 199.99,
    "setup_time": 5
  }
}

sub_task.py

from testplan.runners.pools.tasks.base import task_target
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver import Driver


@testsuite
class Suite1:
    """A test suite with several normal testcases."""

    @testcase
    def test1(self, env, result):
        result.equal("foo", "foo", description="Equality example")

    @testcase
    def test2(self, env, result):
        result.not_equal("foo", "bar", description="Inequality example")

    @testcase
    def test3(self, env, result):
        result.less(2, 12, description="Less comparison example")

    @testcase
    def test4(self, env, result):
        result.greater(10, 5, description="Greater comparison example")


@testsuite
class Suite2:
    """A test suite with parameterized testcases."""

    @testcase(parameters=tuple(range(6)))
    def test_bool(self, env, result, val):
        if val > 0:
            result.true(val, description="Check if value is true")
        else:
            result.false(val, description="Check if value is false")


@task_target(multitest_parts="auto")
def make_auto_part_multitest1():
    return MultiTest(
        name="Proj1-suite",
        suites=[Suite1(), Suite2()],
        environment=[Driver("NoopDriver")],
    )